pktstream.window(self.window_length, self.sliding_interval).transform(lambda rdd: (rdd.filter(lambda p : (p[1]==str('10008'))).map(lambda p : (p[2:])).map(lambda ((ipv4_dstIP,count)): ((ipv4_dstIP),(count))).reduceByKey(lambda x,y: x+y).filter(lambda ((ipv4_dstIP),(count)): ((float(count)>=10 ))))).foreachRDD(lambda rdd:send_reduction_keys(rdd, (u'localhost', 4949),0,'10008'))
pktstream.window(self.window_length, self.sliding_interval).transform(lambda rdd: (rdd.filter(lambda p : (p[1]==str('10032'))).map(lambda p : (p[2:])).map(lambda ((ipv4_dstIP,count)): ((ipv4_dstIP),(count))).reduceByKey(lambda x,y: x+y).filter(lambda ((ipv4_dstIP),(count)): ((float(count)>=10 )))))
spark_queries[20032].join(spark_queries[10032]).map(lambda ((ipv4_dstIP),(ipv4_totalLen,count)): ((ipv4_dstIP),(float(str(ipv4_totalLen))/float(str(count))))).filter(lambda ((ipv4_dstIP),(count2)): ((float(count2)>=10 ))).foreachRDD(lambda rdd: print("Join " + str(rdd.take(5))))
pktstream.window(self.window_length, self.sliding_interval).transform(lambda rdd: (rdd.filter(lambda p : (p[1]==str('20032'))).map(lambda p : (p[2:])).map(lambda ((ipv4_dstIP,ipv4_totalLen)): ((ipv4_dstIP),(ipv4_totalLen))).reduceByKey(lambda x,y: x+y)))
pktstream.window(self.window_length, self.sliding_interval).transform(lambda rdd: (rdd.filter(lambda p : (p[1]==str('20008'))).map(lambda p : (p[2:])).map(lambda ((ipv4_dstIP,ipv4_totalLen)): ((ipv4_dstIP),(ipv4_totalLen))).reduceByKey(lambda x,y: x+y))).foreachRDD(lambda rdd:send_reduction_keys(rdd, (u'localhost', 4949),0,'20008'))
